package fileServer;

import java.io.*;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.SocketException;

import cmm.CMMExn_ClusterErr;
import cmm.CMMExn_TaskErr;


import filePackets.DataPacket;
import filePackets.Receiver;
import filePackets.Sender;

public class TaskServiceThread implements Runnable, ITaskServiceThread {
	private Integer TaskId;
	private FileTaskContext JobCtx;
	private Thread myThreadHandle;
	private FileServerApplication myFileSA;
	private String MyString;
	private static final int SleepBetweenPackets = 100;	// time in ms
	byte[] buf = new byte[FileServerApplication.DGRAM_BUF_LEN];
	private boolean killed;
	
		
	// constructor - starts a new work thread
	public TaskServiceThread(FileServerApplication myFileSA, Integer taskId, FileTaskContext jobCtx) {
		TaskId = taskId;
		JobCtx = jobCtx;
		killed = false;
		this.myFileSA = myFileSA;
		myThreadHandle = new Thread(this);
		MyString = "FileServerThread "+taskId;
		myThreadHandle.start();
		
	}
	
	public void kill(){
		this.killed = true;
	}

	public void run() {
			
		DatagramSocket ToClient;
		String Filename = JobCtx.getFileNumber()+".in";
		File f = new File(Filename);
		long Length=f.length();
		long MaxBlocks = Length/FileServerApplication.DGRAM_BUF_LEN;
		long Remainder = 0;
		
		if ((Length%FileServerApplication.DGRAM_BUF_LEN) !=0){
			MaxBlocks++;
			Remainder = Length%FileServerApplication.DGRAM_BUF_LEN;
		}
		// create a new datagram socket to send data to the client
		try {
			ToClient = new DatagramSocket();
		} catch (SocketException e2) {
			myFileSA.LogEntry(MyString,"error creating new datagram socket");
			e2.printStackTrace();
			return;
		}
		
		if (this.killed) return;

		// open input file
		FileInputStream InFile;
		try {
			InFile = new FileInputStream(Filename);
		} catch (FileNotFoundException e1) {
			myFileSA.LogEntry(MyString,"error opening input file");
			e1.printStackTrace();
			return;
		}
		
		// skip as many blocks as neccessary if we're servicing a task that already started
		if (JobCtx.getBlockNumber() != 0){
			try {
				myFileSA.LogEntry(MyString,"servicing an old task - skipping "+(JobCtx.getBlockNumber())+" block(s)");
				InFile.skip(FileServerApplication.DGRAM_BUF_LEN * (JobCtx.getBlockNumber()));
			} catch (IOException e) {
				myFileSA.LogEntry(MyString,"Fatal Error: cannot skip "+FileServerApplication.DGRAM_BUF_LEN * (JobCtx.getBlockNumber())+" bytes in file");
				e.printStackTrace();
				return;
			}			
		}
				
		if (this.killed) return;
		boolean done;
		
		filePackets.Sender mySender = new Sender();
		filePackets.Receiver myReceiver = new Receiver();
		
		do{
			// read a block from file
			DataPacket OutData = new DataPacket();
			try {
				OutData.setValidBytes(InFile.read(buf));
			} catch (IOException e) {
				myFileSA.LogEntry(MyString,"Error: cannot read from File");
				e.printStackTrace();
				return;
			}
			if (this.killed) return;
			
			
			// create a DataPacket
			OutData.setTaskId((int)TaskId);
			OutData.setBlockData(buf);
			OutData.setBlockNumber(JobCtx.getBlockNumber());
			OutData.setMaxBlocks(MaxBlocks);
			
			try {
				mySender.WriteObjectTo(OutData, ToClient, JobCtx.getConnInfo().getIP(), JobCtx.getConnInfo().getPort());
			} catch (Exception e1) {
				myFileSA.LogEntry(MyString,"Error: cannot read from socket");
				e1.printStackTrace();
				return;
			}			
			if (this.killed) return;
			myFileSA.LogEntry(MyString,"sent block number "+JobCtx.getBlockNumber()+" to "+JobCtx.getConnInfo().getIP()+":"+JobCtx.getConnInfo().getPort());			
			
			// check if there is more data to send in the file
			try {
				done = (InFile.available()== 0);
			} catch (IOException e) {
				myFileSA.LogEntry(MyString, "Error reading file");
				e.printStackTrace();
				return;
			}

			// update task context here and on CMM
			JobCtx.setBlockNumber(JobCtx.getBlockNumber()+1);
			
			try {
				myFileSA.UpdateTask(TaskId, JobCtx);
			} catch (Exception e) {
				e.printStackTrace();
				myFileSA.LogEntry(MyString, "Error updating task");
				return;
			}
			if (this.killed) return;
			done = done || (JobCtx.getBlockNumber() == MaxBlocks);
			try {
				Thread.sleep(SleepBetweenPackets);
			} catch (InterruptedException e) {				
			}
			
		}while (!done && !(this.killed));

		try {
			InFile.close();
		} catch (IOException e) {
			myFileSA.LogEntry(MyString,"Error closing file");
			e.printStackTrace();
			return;
		}
		
		if (this.killed) return;
		
		myFileSA.LogEntry(MyString,"finished sending file to client");
		try {
			myFileSA.FinishTask(TaskId);
		} catch (Exception e) {
			// TODO Auto-generated catch block
			myFileSA.LogEntry(MyString,"error finishing TaskID "+TaskId);
			e.printStackTrace();
			return;
		} 
		System.out.println("finished servicing task ID: "+TaskId);

	}

	public Thread getMyThreadHandle() {
		return myThreadHandle;
	}
}
